#!/usr/bin/env python3
# Processes updates to PostgreSQL full-text search for new/edited messages.
#
# Zulip manages its PostgreSQL full-text search as follows.  When the
# content of a message is modified, a PostgreSQL trigger logs the
# message ID to the `fts_update_log` table.  In the background, this
# program processes `fts_update_log`, updating the PostgreSQL full-text
# search column search_tsvector in the main zerver_message.
import sys

# We want to use a virtualenv in production, which will be in /home/zulip/deployments/current.
# So we should add that path to sys.path and then call setup_path.
# But this file is also used in development, where the above path will not exist.
# So `from scripts.lib.setup_path import setup_path` will raise an ImportError.
# In development, we just want to skip this step since we know that virtualenv will already be in use.
# So catch the ImportError and do nothing.
sys.path.append("/home/zulip/deployments/current")
try:
    from scripts.lib.setup_path import setup_path

    setup_path()
except ImportError:
    pass

import argparse
import configparser
import logging
import os
import select
import sys
import time

import psycopg2
import psycopg2.extensions

BATCH_SIZE = 1000

parser = argparse.ArgumentParser()
parser.add_argument("--quiet", action="store_true")
options = parser.parse_args()

logging.Formatter.converter = time.gmtime
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.getLogger("process_fts_updates")
if options.quiet:
    logger.setLevel(logging.INFO)
else:
    logger.setLevel(logging.DEBUG)


def update_fts_columns(cursor: psycopg2.extensions.cursor) -> int:
    cursor.execute(
        "SELECT id, message_id FROM fts_update_log LIMIT %s;",
        [BATCH_SIZE],
    )
    ids = []
    for (id, message_id) in cursor.fetchall():
        if USING_PGROONGA:
            cursor.execute(
                "UPDATE zerver_message SET "
                "search_pgroonga = "
                "escape_html(subject) || ' ' || rendered_content "
                "WHERE id = %s",
                (message_id,),
            )
        cursor.execute(
            "UPDATE zerver_message SET "
            "search_tsvector = to_tsvector('zulip.english_us_search', "
            "subject || rendered_content) "
            "WHERE id = %s",
            (message_id,),
        )
        ids.append(id)
    cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,))
    return len(ids)


def am_master(cursor: psycopg2.extensions.cursor) -> bool:
    cursor.execute("SELECT pg_is_in_recovery()")
    return not cursor.fetchall()[0][0]


pg_args = {}

# Path to the root of the Zulip codebase in production
sys.path.insert(0, "/home/zulip/deployments/current")
# Path to the root of the Zulip codebase in development
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../..")))
try:
    os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings"
    from django.conf import settings

    if settings.REMOTE_POSTGRES_HOST != "":
        pg_args["host"] = settings.REMOTE_POSTGRES_HOST
    if settings.REMOTE_POSTGRES_PORT != "":
        pg_args["port"] = settings.REMOTE_POSTGRES_PORT
    USING_PGROONGA = settings.USING_PGROONGA
except ImportError:
    # process_fts_updates also supports running locally on a remote
    # PostgreSQL server; in that case, one can just connect to localhost
    USING_PGROONGA = False

# Since we don't want a hard dependency on being able to access the
# Zulip settings (as we may not be running on a server that has that
# data), we determine whether we're using PGroonga using
# /etc/zulip/zulip.conf.
#
# However, we still also check the `USING_PGROONGA` variable, since
# that's all we have in development.
config_file = configparser.RawConfigParser()
config_file.read("/etc/zulip/zulip.conf")
if config_file.has_option("machine", "pgroonga"):
    USING_PGROONGA = True

if "host" in pg_args:
    pg_args["password"] = ""
    if settings.DATABASES["default"]["PASSWORD"] is not None:
        pg_args["password"] = settings.DATABASES["default"]["PASSWORD"]
    pg_args["user"] = settings.DATABASES["default"]["USER"]
    pg_args["dbname"] = settings.DATABASES["default"]["NAME"]
    if settings.REMOTE_POSTGRES_SSLMODE != "":
        pg_args["sslmode"] = settings.REMOTE_POSTGRES_SSLMODE
    else:
        pg_args["sslmode"] = "verify-full"
    pg_args["connect_timeout"] = "600"
else:
    pg_args["user"] = "zulip"

conn = None

retries = 1

while True:
    try:
        if conn is None:
            conn = psycopg2.connect(**pg_args)
            cursor = conn.cursor()
            retries = 30

            conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

            first_check = True
            while not am_master(cursor):
                if first_check:
                    first_check = False
                    logger.warning("In recovery; sleeping")
                time.sleep(5)

            logger.debug("process_fts_updates: listening for search index updates")

            cursor.execute("LISTEN fts_update_log;")
            # Catch up on any historical columns
            while True:
                rows_updated = update_fts_columns(cursor)
                notice = f"process_fts_updates: Processed {rows_updated} rows catching up"
                if rows_updated > 0:
                    logger.info(notice)
                else:
                    logger.debug(notice)

                if rows_updated != BATCH_SIZE:
                    # We're caught up, so proceed to the listening for updates phase.
                    break

        # TODO: If we go back into recovery, we should stop processing updates
        if select.select([conn], [], [], 30) != ([], [], []):
            conn.poll()
            while conn.notifies:
                conn.notifies.pop()
                update_fts_columns(cursor)
    except psycopg2.OperationalError as e:
        retries -= 1
        if retries <= 0:
            raise
        logger.info(e.pgerror, exc_info=True)
        logger.info("Sleeping and reconnecting")
        time.sleep(5)
        if conn is not None:
            conn.close()
            conn = None
    except KeyboardInterrupt:
        print(sys.argv[0], "exited after receiving KeyboardInterrupt")
        break
